package com.bieber.registry;

import com.bieber.common.Constants;
import com.bieber.registry.config.RegistryConfig;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.RetryUntilElapsed;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;

import java.io.ByteArrayOutputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Created by bieber on 2015/8/20.
 */
public class ZookeeperRegistry implements Registry {

    private CuratorFramework client;

    private volatile boolean running=false;


    private static final ConcurrentHashMap<String,PathChildrenCacheListenerImpl> SUBSCRIBE_MAP = new ConcurrentHashMap<String, PathChildrenCacheListenerImpl>();

    private static final LinkedBlockingQueue<PathChildrenCache> PATH_CHILDREN_CACHES = new LinkedBlockingQueue<PathChildrenCache>();

    public ZookeeperRegistry(RegistryConfig config) {
        if(StringUtils.isEmpty(config.getZookeeperConnections())){
            throw new IllegalArgumentException("zookeeper connections must be set");
        }
        client = CuratorFrameworkFactory.builder()
                .connectString(config.getZookeeperConnections())
                .namespace(config.getRoot())
                .connectionTimeoutMs(config.getConnectionTimeout())
                .sessionTimeoutMs(config.getSessionTimeout())
                .retryPolicy(new RetryUntilElapsed(config.getMaxElapsedTime(),config.getSleepMsBetweenRetries()))
                .build();
        client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                if(newState==ConnectionState.RECONNECTED){
                    Constants.COMMON_LOGGER.info("reconnected zookeeper.....");
                    for(PathChildrenCache pathChildrenCache:PATH_CHILDREN_CACHES){
                        try {
                            pathChildrenCache.rebuild();
                        } catch (Exception e) {
                            Constants.COMMON_LOGGER.info("zookeeper connection had reconnected,rebuild pathchildrencace occur an exception",e);
                        }
                    }
                }
            }
        });
        client.start();
        running=true;
    }

    private void validatePath(String path){
        if(StringUtils.isEmpty(path)){
            throw new IllegalArgumentException("path must not be empty");
        }
    }

    @Override
    public void staticRegister(String path) {
        staticRegister(path,true);
    }
    
    @Override
    public void dynamicRegister(String path) {
        dynamicRegister(path,true);
    }

    @Override
    public void staticRegister(String path, boolean ignoreExist) {
        validatePath(path);
        try {
            client.create().withMode(CreateMode.PERSISTENT).forPath(path);
        }catch (KeeperException.NodeExistsException e){
            if(ignoreExist){
                Constants.COMMON_LOGGER.warn(e.getMessage());
            }else{
                throw new IllegalArgumentException("path "+path+" is exist ");
            }
        }catch (Exception e) {
            throw new IllegalArgumentException("create path "+path+" occur an exception",e);
        }
    }

    @Override
    public void dynamicRegister(String path, boolean ignoreExist) {
        validatePath(path);
        try {
            client.create().withMode(CreateMode.EPHEMERAL).forPath(path);
        }catch (KeeperException.NodeExistsException e){
            if(ignoreExist){
                Constants.COMMON_LOGGER.warn(e.getMessage());
            }else{
                throw new IllegalArgumentException("path "+path+" is exist ");
            }

        } catch (Exception e) {
            throw new IllegalArgumentException("create path "+path+" occur an exception",e);
        }
    }

    @Override
    public void subscribe(String path,Notification notification) {
        validatePath(path);
        PathChildrenCacheListenerImpl pathChildrenCacheListener;
        if(SUBSCRIBE_MAP.containsKey(path)){
            pathChildrenCacheListener = SUBSCRIBE_MAP.get(path);
        }else{
            pathChildrenCacheListener = new PathChildrenCacheListenerImpl(path);
            PathChildrenCacheListenerImpl oldPathChildrenCacheListener = SUBSCRIBE_MAP.putIfAbsent(path,pathChildrenCacheListener);
            if(oldPathChildrenCacheListener!=null){
                pathChildrenCacheListener = oldPathChildrenCacheListener;
            }else{
                PathChildrenCache pathChildrenCache = new PathChildrenCache(client,path,false);
                pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);
                try {
                    pathChildrenCache.start();
                    PATH_CHILDREN_CACHES.put(pathChildrenCache);
                } catch (Exception e) {
                    throw new IllegalStateException("subscribe path "+path+" occur an exception",e);
                }
            }
        }
        try {
            notification.notify(client.getChildren().forPath(path));
        } catch (Exception e) {
            Constants.COMMON_LOGGER.warn("initialize subscribe path [{}] occur an exception", path);
            Constants.COMMON_LOGGER.warn("case by",e);
        }
        pathChildrenCacheListener.addNotification(notification);

    }

    @Override
    public void unregister(String path) {
        try {
            client.delete().forPath(path);
        } catch (Exception e) {
            Constants.COMMON_LOGGER.warn("unregister path [{}] occur an exception", path);
            Constants.COMMON_LOGGER.warn("case by",e);
        }
    }

    @Override
    public void updateData(String path, byte[] data) {
        if(running){
            try {
                client.inTransaction().setData().forPath(path,data).and().commit();
            } catch (Exception e) {
                Constants.COMMON_LOGGER.info("failed to set data ", e);
            }
        }
    }

    @Override
    public byte[] getData(String path) {
        if(isRunning()){
            try {
                return client.getData().forPath(path);
            } catch (Exception e) {
                Constants.COMMON_LOGGER.info("failed to get data ",e);
            }
        }
        return new byte[0];
    }

    @Override
    public List<String> listChildren(String path) {
        try {
            return client.getChildren().forPath(path);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void stop() {
        Constants.COMMON_LOGGER.info("stopping registry......");
        running=false;
        client.close();
        PATH_CHILDREN_CACHES.clear();
        SUBSCRIBE_MAP.clear();
        Constants.COMMON_LOGGER.info("stopped registry......");
    }

    private boolean isRunning(){
        return running;
    }

    class PathChildrenCacheListenerImpl implements PathChildrenCacheListener{

        private List<Notification> notifications = new ArrayList<Notification>();

        private String subscribePath;

        public PathChildrenCacheListenerImpl(String subscribePath) {
            this.subscribePath = subscribePath;
        }

        private Lock notificationsLock = new ReentrantLock();

        public void addNotification(Notification notification){
            if(!isRunning()){
                return ;
            }
            try {
                notificationsLock.lockInterruptibly();
                notifications.add(notification);
            } catch (InterruptedException e) {
                throw new IllegalStateException("add notification occur an exception",e);
            }finally {
                notificationsLock.unlock();
            }
        }

        @Override
        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
            switch (event.getType()){
                case CHILD_ADDED:{
                    notify(client ,event);
                    break;
                }
                case CHILD_REMOVED:{
                    notify(client,event);
                    break;
                }
                case CHILD_UPDATED:{
                    notify(client,event);
                    break;
                }
                case INITIALIZED:{
                    notify(client,event);
                    break;
                }
                default:{
                    break;
                }
            }

        }

        private void notify(CuratorFramework client, PathChildrenCacheEvent event){
            try {
                notificationsLock.lockInterruptibly();
                if(!isRunning()){
                    return ;
                }
                List<String> children = client.getChildren().forPath(subscribePath);
                for(int i=0;i<children.size();i++){
                    String child = children.get(i);
                    child=StringUtils.replaceOnce(child,subscribePath,"");
                    children.set(i,child);
                }
                for(Notification notification:notifications){
                    notification.notify(children);
                }
            } catch (InterruptedException e) {
                throw new IllegalStateException("broadcast children change occur an exception",e);
            }catch (Exception e) {
                throw new IllegalStateException("broadcast children change occur an exception",e);
            } finally {
                notificationsLock.unlock();
            }
        }
    }

}
